use std::collections::HashMap;
use std::fmt::Debug;
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use log::{error, info, warn};
use serde::Deserialize;
use serde_json::Value;
use ureq::{Agent, Error};

use crate::datasource::record::Record;

const NAME: &str = env!("CARGO_PKG_NAME");
const VERSION: &str = env!("CARGO_PKG_VERSION");

const RECORD_STATUS_OK: i8 = 1;
const RECORD_STATUS_SERVER_ERROR: i8 = 2;
const RECORD_STATUS_OPS_ERROR: i8 = 3;
const RECORD_STATUS_NETWORK_ERROR: i8 = 4;
const RECORD_STATUS_OBJECT_NOT_EXISTS: i8 = 5;

#[derive(Deserialize, PartialEq, Debug)]
pub struct EndPoint {
    pub url: String,
    pub proxy: Option<String>,
    pub access_token: Option<String>,
    pub oauth_url: Option<String>,
    pub client_id: Option<String>,
    pub client_secret: Option<String>,
    pub indices: HashMap<String, IndexeaIndex>,
}

#[derive(Deserialize, PartialEq, Debug)]
pub struct IndexeaIndex {
    pub app: String,
    pub index: u32,
}

#[derive(Deserialize, PartialEq, Debug)]
pub struct Token {
    token_type: String,
    access_token: String,
    expires_in: i32,
    scope: String,
    #[serde(skip_deserializing)]
    created: u64,
}

#[derive(Deserialize, PartialEq, Debug)]
pub struct Message {
    pub error: u32,
    pub message: String,
}

impl EndPoint {
    fn client(&self) -> Agent {
        let os = if let Some(os) = sysinfo::System::long_os_version() {
            os
        } else {
            "Unknown".to_string()
        };
        let agent_builder = ureq::builder()
            .timeout_connect(Duration::from_secs(1))
            .timeout(Duration::from_secs(60))
            .user_agent(format!("{}/{} ({})", NAME, VERSION, os.trim()).as_str());
        if let Some(proxy_url) = &self.proxy {
            if let Ok(proxy) = ureq::Proxy::new(proxy_url) {
                return agent_builder.proxy(proxy).build();
            }
        }
        agent_builder.build()
    }

    /// Test if endpoint is reachable
    pub fn test(&self) -> Result<(), Error> {
        let _ = self.client().get(&self.url).call()?.into_string()?;
        Ok(())
    }

    /// get token
    pub fn token(&self) -> Result<Token, Error> {
        if let Some(token) = &self.access_token {
            return Ok(Token {
                token_type: String::from(""),
                access_token: token.clone(),
                expires_in: -1,
                scope: String::from(""),
                created: 0,
            });
        }
        self.request_token()
    }

    /// get oauth token if it expired
    fn request_token(&self) -> Result<Token, Error> {
        let client = self.client();
        let authorize_url = format!(
            "{}/authorize?client_id={}&response_type=code&scope=WRITE",
            self.oauth_url.as_ref().unwrap(),
            self.client_id.as_ref().unwrap()
        );
        let code: String = client.get(&authorize_url).call()?.into_string()?;
        let token_url =
            format!("{}/token?grant_type=authorization_code", self.oauth_url.as_ref().unwrap());
        let mut token: Token = client
            .post(&token_url)
            .send_form(&[
                ("code", &code),
                ("client_id", &self.client_id.as_ref().unwrap()),
                ("client_secret", &self.client_secret.as_ref().unwrap()),
            ])?
            .into_json()?;

        let since_the_epoch =
            SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards");
        token.created = since_the_epoch.as_secs();

        Ok(token)
    }

    /// Push records to indexea
    pub fn push<'a>(&self, token: &'a Token, records: &'a mut Vec<Record>) -> &'a Vec<Record> {
        //TODO 使用批量处理进行优化
        for rec in &mut *records {
            if rec.value == serde_json::Value::Null {
                rec.task.status = RECORD_STATUS_OBJECT_NOT_EXISTS;
                warn!("[{}] object not found, value = {}.", rec.task.task, rec.task.value);
            } else if let Some(idx) = self.indices.get(&rec.index) {
                let mut try_times = 1;
                'retry: loop {
                    let push_begin_time = std::time::SystemTime::now()
                        .duration_since(std::time::UNIX_EPOCH)
                        .expect("Time went backwards");
                    let result = match rec.task.ops {
                        1 | 2 => self.inserts(token, &vec![rec.value.clone()], &idx),
                        3 => self.delete(token, &rec.value, &idx),
                        _ => {
                            rec.task.status = RECORD_STATUS_OPS_ERROR;
                            break 'retry;
                        }
                    };
                    match result {
                        Ok(_) => {
                            let push_end_time = std::time::SystemTime::now()
                                .duration_since(std::time::UNIX_EPOCH)
                                .expect("Time went backwards");
                            rec.task.status = RECORD_STATUS_OK;
                            let action = if rec.task.ops == 3 { "delete" } else { "push" };
                            info!(
                                "{} to indexea [{},{}] in {}ms",
                                action,
                                idx.app,
                                idx.index,
                                push_end_time.as_millis() - push_begin_time.as_millis()
                            );
                            break 'retry;
                        }
                        Err(Error::Status(code, resp)) => {
                            rec.task.status = RECORD_STATUS_SERVER_ERROR;
                            if let Ok(msg) = resp.into_json::<Message>() {
                                error!(
                                    "failed to push record to [\"{}\"], code: {}, reason: {}",
                                    self.url, msg.error, msg.message
                                )
                            } else {
                                error!(
                                    "failed to push record to [\"{}\"], code: {}",
                                    self.url, code
                                )
                            };
                            break 'retry;
                        }
                        Err(Error::Transport(t)) => {
                            error!(
                                "failed to push record, reason: {:?}, waiting for retry...",
                                t.to_string()
                            );
                            if try_times >= 5 {
                                rec.task.status = RECORD_STATUS_NETWORK_ERROR;
                                break 'retry;
                            }
                            //sleep and retry
                            try_times = try_times + 1;
                            thread::sleep(std::time::Duration::from_secs(5));
                        }
                    }
                }
            }
        }
        records
    }

    /// batch insert or update
    pub fn inserts(
        &self,
        token: &Token,
        records: &Vec<Value>,
        idx: &IndexeaIndex,
    ) -> Result<(), Error> {
        let url = format!("{}records/{}/{}", self.url, idx.app, idx.index);
        let resp = self
            .client()
            .put(&url)
            .set("Authorization", &format!("Bearer {}", token.access_token))
            .send_json(records)?;
        if resp.status() != 200 {
            error!(
                "failed to push records to indexea [{},{}], code:{}, reason: {}",
                idx.app,
                idx.index,
                resp.status(),
                resp.into_string()?
            );
        }
        Ok(())
    }

    fn delete(&self, token: &Token, value: &Value, idx: &IndexeaIndex) -> Result<(), Error> {
        if let Value::String(_id) = value {
            let url = format!("{}records/{}/{}?_id={}", self.url, idx.app, idx.index, _id);
            let resp = self
                .client()
                .delete(&url)
                .set("Authorization", &format!("Bearer {}", token.access_token))
                .call()?;
            if resp.status() != 200 {
                error!(
                    "failed to delete record from indexea [{},{}], code:{}, reason: {}",
                    idx.app,
                    idx.index,
                    resp.status(),
                    resp.into_string()?
                );
            }
        }
        Ok(())
    }
}

impl Token {
    pub fn new() -> Token {
        Token {
            token_type: String::new(),
            access_token: String::new(),
            expires_in: 0,
            scope: String::new(),
            created: 0,
        }
    }
    pub fn is_expired(&self) -> bool {
        let since_the_epoch =
            SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs();
        since_the_epoch > self.created + self.expires_in as u64
    }
}

/**

pub url: String,
pub proxy: Option<String>,
pub access_token: Option<String>,
pub oauth_url: Option<String>,
pub client_id: Option<String>,
pub client_secret: Option<String>,
pub indices: HashMap<String, IndexeaIndex>,
 */
impl Clone for EndPoint {
    fn clone(&self) -> EndPoint {
        let this = EndPoint {
            url: format!("{}", self.url),
            proxy: self.proxy.clone(),
            access_token: self.access_token.clone(),
            oauth_url: self.oauth_url.clone(),
            client_id: self.client_id.clone(),
            client_secret: self.client_secret.clone(),
            indices: self.indices.clone(),
        };
        this
    }
}

impl Clone for IndexeaIndex {
    fn clone(&self) -> IndexeaIndex {
        let data = IndexeaIndex { app: format!("{}", self.app), index: self.index };
        data
    }
}

impl Clone for Token {
    fn clone(&self) -> Token {
        let data = Token {
            token_type: format!("{}", self.token_type),
            access_token: format!("{}", self.access_token),
            expires_in: self.expires_in,
            scope: format!("{}", self.scope),
            created: self.created,
        };
        data
    }
}
